package com.nx.arch.transactionmsg;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.druid.pool.DruidDataSource;
import com.nx.arch.transactionmsg.model.TxMsgDataSource;
import com.nx.arch.transactionmsg.model.Msg;
import com.nx.arch.transactionmsg.model.MsgInfo;
import com.nx.arch.transactionmsg.utils.Config;

/**
 * @类名称 MsgStorage.java
 * @类描述 消息事务 托管类(等价于dao)
 * @作者  庄梦蝶殇 linhuaichuan@naixuejiaoyu.com
 * @创建时间 2020年4月13日 下午5:49:08
 * @版本 1.0.0
 *
 * @修改记录
 * <pre>
 *     版本                       修改人 		修改日期 		 修改内容描述
 *     ----------------------------------------------
 *     1.0.0 		庄梦蝶殇 	2020年4月13日             
 *     ----------------------------------------------
 * </pre>
 */
public class MsgStorage {
    private static final Logger log = LoggerFactory.getLogger(MsgStorage.class);
    
    /**
     * 事务消息状态-等待
     */
    protected static final int MSG_STATUS_WAITING = 1;
    
    /**
     * 事务消息状态-发送
     */
    protected static final int MSG_STATUS_SEND = 2;
    
    /**
     * 驱动类
     */
    private static final String DB_DRIVER_CLASS = "com.mysql.jdbc.Driver";
    
    /**
     * 默认表明
     */
    private static String tableName = "mq_messages";
    
    /**
     * insert sql
     */
    private static String insertSQL = "insert into %s(content,topic,tag,status,delay) values(?,?,?,?,?) ";
    
    /**
     * select sql
     */
    private static String selectByIdSQL = "select id,content,topic,tag,status,create_time,delay from %s where id=? ";
    
    /**
     * 获取   最小等待  事务消息
     */
    private static String selectMinIdOfWaitingSQL = "select min(id) from %s where status=? ";
    
    /**
     * 更改事务消息状态
     */
    private static String updateStatusSQL = "update %s set status=? where id=?";
    
    /**
     * 获取 等待 事务消息列表
     */
    private static String selectWaitingMsgSQL = "select id,content,topic,tag,status,create_time,delay from %s where status=? and create_time >= ? order by id limit ?";
    
    /**
     * 获取指定主题下的 等待事务消息列表
     */
    private static String selectWaitingMsgWithTopicsSQL = "select id,content,topic,tag,status,create_time,delay from %s where status=?  and create_time >= ? and topic in ";
    
    /**
     * delete sql
     */
    private static String deleteMsgSQL = "delete from %s where status=? and create_time <=?  limit ?";
    
    /**
     * delay 列检测
     */
    private static String checkDelayColumnSQL = "SELECT count(column_name) FROM information_schema.columns WHERE table_schema = DATABASE() and table_name = '%s' AND column_name = 'delay'";
    
    // private static final String deleteMsgWithDayStr = "delete from mq_messages where status=? and create_time <= timestampadd(day,-%d,current_timestamp) limit ?";
    // private static String deleteMsgWithDaySQL = null;
    
    private static long dayTimeDiff;
    
    /**
     * 10分钟
     */
    private static int minuteTimeDiff = 1000 * 60 * 10;
    
    private List<TxMsgDataSource> dbDataSources;
    
    private HashMap<String, DataSource> dataSourcesMap;
    
    private Config config;
    
    /**
     * 主题列表
     */
    private List<String> topicLists;
    
    public MsgStorage(List<TxMsgDataSource> dbDataSources, List<String> topicLists) {
        this.dbDataSources = dbDataSources;
        this.dataSourcesMap = new HashMap<String, DataSource>();
        this.topicLists = topicLists;
        log.info("MsgStorage topicLists {}", topicLists);
    }
    
    public void init(Config config)
        throws Exception {
        log.info("start init MsgStorage db Size {} msg Store {} day", dbDataSources.size(), config.getHistoryMsgStoreTime());
        this.config = config;
        dayTimeDiff = config.getHistoryMsgStoreTime() * 86400000;
        tableName = config.getMsgTableName();
        log.info("delete msg dayTime diff {} msgTable {} ", dayTimeDiff, config.getMsgTableName());
        initSql();
        for (TxMsgDataSource dbSrc : dbDataSources) {
            DruidDataSource result = new DruidDataSource();
            result.setDriverClassName(DB_DRIVER_CLASS);
            result.setUrl(dbSrc.getUrl());
            result.setUsername(dbSrc.getUsername());
            result.setPassword(dbSrc.getPassword());
            result.setInitialSize(this.config.getMinIdleConnectionNum());
            result.setMinIdle(this.config.getMinIdleConnectionNum());
            result.setMaxWait(this.config.getMaxWaitTime());
            result.setMaxActive(this.config.getMaxActiveConnectionNum());
            // mysql 超时会断开连接
            result.setTestWhileIdle(true);
            result.setValidationQuery("SELECT 1 FROM DUAL;");
            checkDelayColumn(result);
            // 来自的com.nx.arch.transactionmsg.DBDataSource配置的DB连接url，需要包含业务发送事务消息函数的上下文连接
            // 否则消息无法发送
            dataSourcesMap.put(dbSrc.getUrl(), result);
        }
        log.info("init MsgStorage success");
    }
    
    /**
     * 业务方可以修改表名，一个库可以由多个message
     */
    public void initSql() {
        insertSQL = String.format(insertSQL, tableName);
        log.info("insertSQL {}", insertSQL);
        selectByIdSQL = String.format(selectByIdSQL, tableName);
        log.info("selectByIdSQL {}", selectByIdSQL);
        selectMinIdOfWaitingSQL = String.format(selectMinIdOfWaitingSQL, tableName);
        updateStatusSQL = String.format(updateStatusSQL, tableName);
        log.info("updateStatusSQL {}", updateStatusSQL);
        selectWaitingMsgSQL = String.format(selectWaitingMsgSQL, tableName);
        selectWaitingMsgWithTopicsSQL = String.format(selectWaitingMsgWithTopicsSQL, tableName);
        deleteMsgSQL = String.format(deleteMsgSQL, tableName);
        checkDelayColumnSQL = String.format(checkDelayColumnSQL, tableName);
    }
    
    /**
     * @方法名称 checkDelayColumn
     * @功能描述 检测delay列是否存在 
     * @param dataSrc 数据源
     * @throws Exception 列名存在异常
     */
    private void checkDelayColumn(DataSource dataSource)
        throws Exception {
        Connection con = null;
        PreparedStatement psmt = null;
        ResultSet rs = null;
        try {
            con = dataSource.getConnection();
            psmt = con.prepareStatement(checkDelayColumnSQL);
            rs = psmt.executeQuery();
            boolean delayColumnExists = false;
            while (rs.next()) {
                delayColumnExists = rs.getLong(1) > 0;
                break;
            }
            if (!delayColumnExists) {
                throw new Exception(new String("TransactionMsgClient has been upgraded, its accessing db's table " + tableName + " should add a delay named column, please contact with dba to alter the table"));
            }
        } catch (SQLException ex) {
            throw ex;
        } finally {
            closeResultSet(rs);
            closePreparedStatement(psmt);
            if (con != null) {
                con.close();
            }
        }
    }
    
    public void close() {
        log.info("start close MsgStorage");
        Iterator<Entry<String, DataSource>> it = dataSourcesMap.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, DataSource> entry = it.next();
            DruidDataSource dataSrc = (DruidDataSource)entry.getValue();
            dataSrc.close();
        }
    }
    
    /**
     * @param con 插入消息用的是业务方的连接，由业务方管理
     * @param content 事务消息
     * @param topic 主题
     * @param tag 标签
     * @param delay 延迟时间(单位：s)
     * @return
     * @throws SQLException
     */
    public static Map.Entry<Long, String> insertMsg(Connection con, String content, String topic, String tag, int delay)
        throws SQLException {
        PreparedStatement psmt = null;
        ResultSet results = null;
        try {
            psmt = con.prepareStatement(insertSQL, Statement.RETURN_GENERATED_KEYS);
            DatabaseMetaData metaData = con.getMetaData();
            String url = metaData.getURL();
            psmt.setString(1, content);
            psmt.setString(2, topic);
            psmt.setString(3, tag);
            psmt.setInt(4, MSG_STATUS_WAITING);
            psmt.setInt(5, delay);
            psmt.executeUpdate();
            results = psmt.getGeneratedKeys();
            Long id = null;
            if (results.next()) {
                id = results.getLong(1);
            }
            Map.Entry<Long, String> idUrlPair = new AbstractMap.SimpleEntry<Long, String>(id, url);
            return idUrlPair;
        } catch (SQLException ex) {
            throw ex;
        } finally {
            closeResultSet(results);
            closePreparedStatement(psmt);
        }
    }
    
    /**
     * @param msg 需要批量吗？
     * @return 事务消息
     * @throws SQLException
     */
    public MsgInfo getMsgById(Msg msg)
        throws SQLException {
        if (msg == null) {
            return null;
        }
        Long id = msg.getId();
        String url = msg.getUrl();
        DataSource dataSrc = dataSourcesMap.get(url);
        Connection con = null;
        PreparedStatement psmt = null;
        ResultSet rs = null;
        try {
            con = dataSrc.getConnection();
            psmt = con.prepareStatement(selectByIdSQL);
            psmt.setLong(1, id);
            rs = psmt.executeQuery();
            MsgInfo msgInfo = null;
            while (rs.next()) {
                msgInfo = new MsgInfo();
                msgInfo.setId(rs.getLong(1));
                msgInfo.setContent(rs.getString(2));
                msgInfo.setTopic(rs.getString(3));
                msgInfo.setTag(rs.getString(4));
                msgInfo.setStatus(rs.getInt(5));
                msgInfo.setCreateTime(rs.getTimestamp(6));
                msgInfo.setDelay(rs.getInt(7));
            }
            return msgInfo;
        } catch (SQLException ex) {
            throw ex;
        } finally {
            closeResultSet(rs);
            closePreparedStatement(psmt);
            if (con != null) {
                con.close();
            }
        }
    }
    
    /**
     * @方法名称 updateSendMsg
     * @功能描述 更改 事务消息 为已发送
     * @param msg 修改msgInfo状态为发送成功，如果是分库分表的情况，需要知道数据库
     * @return 影响行数
     * @throws SQLException
     */
    public int updateSendMsg(Msg msg)
        throws SQLException {
        String url = msg.getUrl();
        DataSource dataSrc = dataSourcesMap.get(url);
        Connection con = null;
        PreparedStatement psmt = null;
        try {
            con = dataSrc.getConnection();
            psmt = con.prepareStatement(updateStatusSQL);
            psmt.setInt(1, MSG_STATUS_SEND);
            psmt.setLong(2, msg.getId());
            return psmt.executeUpdate();
        } catch (SQLException ex) {
            throw ex;
        } finally {
            closePreparedStatement(psmt);
            if (con != null) {
                con.close();
            }
        }
    }
    
    /**
     * @方法名称 updateMsgStatus
     * @功能描述 更改事务消息为发送状态
     * @param dataSrc 数据源
     * @param id 主键
     * @return 修改数量
     * @throws SQLException
     */
    public int updateMsgStatus(DataSource dataSrc, Long id)
        throws SQLException {
        Connection con = null;
        PreparedStatement psmt = null;
        try {
            con = dataSrc.getConnection();
            psmt = con.prepareStatement(updateStatusSQL);
            psmt.setInt(1, MSG_STATUS_SEND);
            psmt.setLong(2, id);
            return psmt.executeUpdate();
        } catch (SQLException ex) {
            throw ex;
        } finally {
            closePreparedStatement(psmt);
            if (con != null) {
                con.close();
            }
        }
    }
    
    /**
     * @方法名称 getMinIdOfWaitingMsg
     * @功能描述 获取最早的等待事务消息
     * @param dataSrc 数据源
     * @return 最早id
     * @throws SQLException
     */
    public Long getMinIdOfWaitingMsg(DataSource dataSrc)
        throws SQLException {
        Connection con = null;
        PreparedStatement psmt = null;
        try {
            con = dataSrc.getConnection();
            psmt = con.prepareStatement(selectMinIdOfWaitingSQL);
            psmt.setInt(1, MSG_STATUS_WAITING);
            Long minId = null;
            ResultSet rs = psmt.executeQuery();
            if (rs.next()) {
                minId = rs.getLong(1);
            }
            return minId;
        } catch (SQLException ex) {
            throw ex;
        } finally {
            closePreparedStatement(psmt);
            if (con != null) {
                con.close();
            }
        }
    }
    
    /**
     * 最开始是为了防止重复，最近的一段时间消息不扫描，扫描比如5s之前的数据（因为离线扫描和，实时会同时处理有可能)
     * 后面又为了优化sql，只查询最近几分钟(10)的数据，这里有小小的问题，在很极端的情况下，
     * 如果是最近10分钟都没有被扫描到,则后面会扫描不到了。应该是状态等于wait的都是要扫描的，这里还得改
     * @param dataSrc 
     * @param pageSize
     * @return 
     * @throws SQLException 
     */
    public List<MsgInfo> getWaitingMsg(DataSource dataSrc, int pageSize)
        throws SQLException {
        Connection con = null;
        PreparedStatement psmt = null;
        ResultSet rs = null;
        try {
            con = dataSrc.getConnection();
            String sql = selectWaitingMsgSQL;
            boolean flag = false;
            
            if (topicLists != null && !topicLists.isEmpty()) {
                // ( ? ) order by id limit ?
                StringBuilder sb = new StringBuilder(selectWaitingMsgWithTopicsSQL);
                sb.append(" ( ");
                for (int i = 0; i < topicLists.size(); i++) {
                    if (i < topicLists.size() - 1) {
                        sb.append(" ? ,");
                    } else {
                        sb.append(" ? ");
                    }
                }
                sb.append(" ) ");
                sb.append(" order by id limit ? ;");
                flag = true;
                sql = sb.toString();
            }
            
            psmt = con.prepareStatement(sql);
            psmt.setInt(1, MSG_STATUS_WAITING);
            psmt.setTimestamp(2, getSomeMinuteBeforeTimeStamp());
            if (flag) {
                int j = 3;
                for (int i = 0; i < topicLists.size(); i++, j++) {
                    String topic = topicLists.get(i);
                    psmt.setString(j, topic);
                    
                }
                psmt.setInt(j, pageSize);
            } else {
                psmt.setInt(3, pageSize);
            }
            rs = psmt.executeQuery();
            List<MsgInfo> list = new ArrayList<MsgInfo>(pageSize);
            while (rs.next()) {
                MsgInfo msgInfo = new MsgInfo();
                msgInfo.setId(rs.getLong(1));
                msgInfo.setContent(rs.getString(2));
                msgInfo.setTopic(rs.getString(3));
                msgInfo.setTag(rs.getString(4));
                msgInfo.setStatus(rs.getInt(5));
                msgInfo.setCreateTime(rs.getTimestamp(6));
                msgInfo.setDelay(rs.getInt(7));
                list.add(msgInfo);
            }
            return list;
        } catch (SQLException ex) {
            throw ex;
        } finally {
            closeResultSet(rs);
            closePreparedStatement(psmt);
            if (con != null) {
                con.close();
            }
        }
    }
    
    /**
     * @param dataSrc 数据源
     * @param limitNum sql 语句中已经指明只删除三天之前的发送成功的消息
     * @return
     * @throws SQLException 
     */
    public int deleteSendedMsg(DataSource dataSrc, int limitNum)
        throws SQLException {
        Connection con = null;
        PreparedStatement psmt = null;
        try {
            con = dataSrc.getConnection();
            psmt = con.prepareStatement(deleteMsgSQL);
            psmt.setInt(1, MSG_STATUS_SEND);
            psmt.setTimestamp(2, getSomeDayBeforeTimeStamp());
            psmt.setInt(3, limitNum);
            return psmt.executeUpdate();
        } catch (SQLException ex) {
            throw ex;
        } finally {
            closePreparedStatement(psmt);
            if (con != null) {
                con.close();
            }
        }
        
    }
    
    private Timestamp getSomeDayBeforeTimeStamp() {
        long time = System.currentTimeMillis() - dayTimeDiff;
        Timestamp timestamp = new Timestamp(time);
        return timestamp;
    }
    
    private Timestamp getSomeMinuteBeforeTimeStamp() {
        long time = System.currentTimeMillis() - minuteTimeDiff;
        Timestamp timestamp = new Timestamp(time);
        return timestamp;
    }
    
    protected HashMap<String, DataSource> getDataSourcesMap() {
        return dataSourcesMap;
    }
    
    protected void setDataSourcesMap(HashMap<String, DataSource> dataSourcesMap) {
        this.dataSourcesMap = dataSourcesMap;
    }
    
    public static String getTablename() {
        return tableName;
    }
    
    public List<TxMsgDataSource> getDbDataSources() {
        return dbDataSources;
    }
    
    public void setDbDataSources(List<TxMsgDataSource> dbDataSources) {
        this.dbDataSources = dbDataSources;
    }
    
    public static void closeResultSet(ResultSet rs) {
        if (rs != null) {
            try {
                rs.close();
            } catch (SQLException e) {
                //
                log.error("close Connection ResultSet error {} ", rs, e);
            }
        }
    }
    
    public static void closePreparedStatement(PreparedStatement psmt) {
        if (psmt != null) {
            try {
                psmt.close();
            } catch (SQLException e) {
                // Auto-generated catch block
                log.error("close Connection PreparedStatement {} error ", psmt, e);
            }
        }
    }
    
    public boolean isInTopicLists(String topic) {
        if (topic == null) {
            return false;
        }
        
        for (int i = 0; i < topicLists.size(); i++) {
            String retryTopic = topicLists.get(i);
            if (retryTopic != null && retryTopic.equals(topic)) {
                return true;
            }
        }
        return false;
    }
    
}
